feat(ingestion): cross-connector filter visibility report#28355
feat(ingestion): cross-connector filter visibility report#28355harshach wants to merge 7 commits into
Conversation
Generalize Snowflake's per-database discovery + filter-reason logging (#28336) to a reusable helper used by every connector family — Database, Dashboard, Pipeline, Messaging, Storage, MLModel. At end of each source step, emit a single grep-friendly "FILTER VISIBILITY REPORT" block listing the discovered count, every filtered name + reason, and the kept count per entity type, so users can tell whether a missing entity was removed by includes/excludes or never visible to the ingestion role. Storage profile is the diff only: discovered counts (int per type) + the existing Status.filtered list (now carrying richer reasons). No discovered-name or kept-name lists; both are derivable. No spec/JSON schema change, no UI work. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR adds a reusable ingestion-side logging utility to standardize “discovered → filtered → kept” visibility across connector families, culminating in a single grep-friendly FILTER VISIBILITY REPORT emitted at step close to help users distinguish source-permission invisibility from filter-pattern exclusion.
Changes:
- Introduces
metadata.utils.filter_visibilityhelpers (log_discovered,log_filtered,log_step_summary) and wires them into multiple connector base classes and concrete sources. - Extends
Statuswith an in-memorydiscovered_countsaccumulator to compute “kept” counts without storing full discovered/kept name lists. - Adds unit tests for helper behavior and report formatting.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| ingestion/tests/unit/utils/test_filter_visibility.py | New unit tests validating helper accumulation and report formatting. |
| ingestion/src/metadata/utils/filter_visibility.py | New centralized helper module for discovery/filter logging and consolidated report emission. |
| ingestion/src/metadata/ingestion/source/storage/storage_service.py | Uses helper for filtered container entries and emits step summary on close. |
| ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py | Logs discovered pipelines, logs filtered pipelines with rich reasons, emits step summary on close. |
| ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/metadata.py | Logs discovered/filtered ML models using helper. |
| ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py | Emits step summary on close for MLModel sources. |
| ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py | Logs discovered/filtered ML models using helper. |
| ingestion/src/metadata/ingestion/source/messaging/messaging_service.py | Logs discovered/filtered topics and emits step summary on close. |
| ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py | Moves Snowflake database discovery/filter logging to shared helper. |
| ingestion/src/metadata/ingestion/source/database/redshift/metadata.py | Adds discovery + rich filter logging for databases via helper. |
| ingestion/src/metadata/ingestion/source/database/postgres/metadata.py | Adds discovery + rich filter logging for databases via helper. |
| ingestion/src/metadata/ingestion/source/database/mssql/metadata.py | Adds discovery + rich filter logging for databases via helper. |
| ingestion/src/metadata/ingestion/source/database/database_service.py | Instruments DB base-class filtering paths with helper-driven rich filter logging (and schema discovery logging). |
| ingestion/src/metadata/ingestion/source/database/common_db_source.py | Logs discovered tables/views, logs filtered tables/views with rich reasons, emits step summary on close. |
| ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py | Adds discovery + rich filter logging for databases/schemas via helper. |
| ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py | Logs discovered dashboards, logs filtered dashboards/projects, emits step summary on close. |
| ingestion/src/metadata/ingestion/api/status.py | Adds discovered_counts + record_discovered() to support report computations. |
…efenses
Observability code in the ingestion hot path must not fail connectors.
Adds three layers of defense:
1. Helper-internal try/except in log_discovered, log_filtered, and
log_step_summary. Any failure is logged once at WARN and swallowed.
2. Call-site try/except around log_step_summary in every base class
close() (database, dashboard, pipeline, messaging, storage, mlmodel)
so a summary failure can't mask the real cleanup work.
3. Per-entity-type cap on Status.filtered (MAX_FILTERED_ENTRIES_PER_TYPE
= 50_000). Past the cap, log_filtered only bumps a new
Status.filtered_counts dict so the true count stays accurate without
unbounded memory growth. The report annotates the truncation.
Test coverage doubled (12 -> 23): adds bounded-growth tests for the cap,
resilience tests for broken loggers / bad inputs / corrupted Status, and
an integration-style multi-database lifecycle test that verifies counts
are correct after the full streaming-log + summary cycle.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- log_discovered: skip list() materialization when DEBUG logging is off. Use len() directly for Sized inputs (zero alloc) and a streaming sum() for generators. Cuts the per-discovery cost from O(n) memory to O(1) on large catalogs when --debug isn't active. - _entity_type_from_reason: case-insensitive marker match so legacy reasons like "Database Filtered out" (lowercase 'out' from older BigQuery code paths) get bucketed into the right report section instead of being silently undercounted. - dashboard_service.get_dashboard: null-safe via `or []` (matches the Optional[List[Any]] contract Pipeline / Messaging sources already use); materialize names once into a list so log_discovered can take the zero-allocation Sized path instead of re-listing a generator over the heavy dashboard objects. - common_db_source.get_tables_name_and_type / dashboard_service: skip the O(n) shallow copy when query_*_names_and_types() / get_dashboards_list() already returned a list. Only materialize when a subclass returned a generator. - Report: rename "Will be published to OpenMetadata" → "Passed filter patterns" and prepend a note explaining the count is filter-decision only (does not subtract source-side extraction errors or secondary filters like projectFilterPattern). Avoids confusion when reconciling report counts with the final ingested set. Tests: 23 → 28 (added DEBUG-gated materialization, Sized len() path, case-insensitive marker, and clarifying-note assertions). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
🔴 Playwright Results — 1 failure(s), 14 flaky✅ 4144 passed · ❌ 1 failed · 🟡 14 flaky · ⏭️ 91 skipped
Genuine Failures (failed on all attempts)❌
|
- mark_databases_as_deleted: pass add_to_status=False to
_get_filtered_database_names. Without this, every filtered database
was being recorded twice (once during main ingestion via the
per-connector get_database_names, once during the maintenance pass),
inflating Status.filtered + filtered_counts and skewing the report.
- _get_filtered_schema_names: only materialize the raw schema names
when add_to_status=True (where log_discovered needs the count up
front). The mark-deleted maintenance paths iterate once with
add_to_status=False — streaming saves O(n) memory on large schemas.
Same fix applied to BigQuery's override.
- dashboard/pipeline/messaging get_X(): compute names once into
(entity, name) tuples instead of calling the connector-specific
get_*_name() twice per entity (once to build the discovery list,
once inside the filter loop). Real savings when name extraction is
non-trivial (e.g., Tableau workbook lookups).
- DB connector log_filtered calls: use the raw entity name (database_name,
schema_name, table_name, view_name, new_database, project_id) as
`name` and the FQN as `matched_against`. The previous convention of
`name=<fqn>` produced confusing report output when useFqnForFiltering
was False ("filtered name = FQN, matched against = raw name") and
was inconsistent with non-SQL connectors (Dashboard/Pipeline/Topic/
MLModel) which already stored raw names. Report column is now
readable across all connector families.
Tests: existing 33 still pass (no test changes needed — the storage-key
change to raw names is verified by the existing dashboard/pipeline/topic
assertions which always asserted raw names).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three categories of new basedpyright errors introduced by the filter visibility refactor — all surfaced by the `--baselinemode=discard` CI gate: - log_step_summary: source_name was typed `str` but every call site passes `self.config.serviceName` which pyright infers as `str | None`. Widened to `str | None` and gracefully fall back to '<unknown source>' in the report header. - filter_by_schema / filter_by_table / filter_by_database: I extracted the `filter_name = X if useFqn else Y` ternary into a variable across several connectors. The inline ternary was fine but the extracted variable became `str | None` to pyright (because one branch is an FQN typed as Optional). Widened these three helpers in filters.py to accept `Optional[str]` — they already delegate to `_filter()` which handles None per its own contract. Aligns the wrapper signatures with the actual delegate behavior; one-file fix instead of casting at 9 call sites. - mlflow.get_mlmodels: the existing `cast(RegisteredModel, ...)` told pyright the search call returned a single model, so my `list(...)` wrap produced `list[tuple[str, Any]]` and broke `m.name` access. Fixed the cast to `cast(List[RegisteredModel], list(...))`. That surfaced a pre-existing latent bug: `model.latest_versions` is Optional but was iterated directly — guarded with `or []`. Verification: - `basedpyright -p pyproject.toml --baselinefile .basedpyright/baseline.json --baselinemode=discard` now reports zero new errors (the remaining errors are env-specific airflow/locust/pydoris import warnings already in the baseline; CI has those packages installed). - `make py_format_check` clean. - 79/79 helper + filter_pattern + common_db_source + snowflake tests still pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code Review ✅ Approved 4 resolved / 4 findingsGeneralizes filter visibility reporting across all connector families to provide clear, grep-friendly logs for entity filtering. All prior review findings regarding count aggregation, dashboard project naming, and memory management have been resolved. ✅ 4 resolved✅ Performance: Materializing full dashboard list may cause memory pressure
✅ Edge Case: 'kept' count may be misleading when dashboards fail detail extraction
✅ Edge Case: log_filtered called with list instead of string for project_name
✅ Edge Case: get_filtered_count undercounts in mixed helper+legacy usage
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|
Code Review ✅ Approved 4 resolved / 4 findingsGeneralizes filter visibility reporting across all connector families to provide clear, grep-friendly logs for entity filtering. All prior review findings regarding count aggregation, dashboard project naming, and memory management have been resolved. ✅ 4 resolved✅ Performance: Materializing full dashboard list may cause memory pressure
✅ Edge Case: 'kept' count may be misleading when dashboards fail detail extraction
✅ Edge Case: log_filtered called with list instead of string for project_name
✅ Edge Case: get_filtered_count undercounts in mixed helper+legacy usage
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |



Describe your changes:
Generalizes #28336's per-database discovery + filter-reason logging into a reusable helper that every connector family — Database, Dashboard, Pipeline, Messaging, Storage, MLModel — now uses. At end of each source step it emits a single grep-friendly FILTER VISIBILITY REPORT block listing the discovered count, every filtered name + reason, and the kept count per entity type, so users can immediately tell whether a missing entity was removed by
includes/excludesor never visible to the ingestion role at all. Storage profile is the diff only — discovered counts (int per type) plus the existingStatus.filteredlist now carrying richer reasons; no discovered-name or kept-name lists, both are derivable.Type of change:
High-level design:
ingestion/src/metadata/utils/filter_visibility.py— three helpers (log_discovered,log_filtered,log_step_summary) own the message format centrally so every connector family produces identical output.Statusgaineddiscovered_counts: Dict[str, int](in-memory only, no spec/JSON schema change) so the report can compute kept = discovered − filtered.database_service.py(_get_filtered_database_names,_get_filtered_schema_names),common_db_source.py(get_tables_name_and_typefor tables + views),dashboard_service.py,pipeline_service.py,messaging_service.py,storage_service.py,mlmodel_service.py— each base class'sclose()emits the report.get_database_namesdiscovery lifted into Snowflake (replacing fix(snowflake): log discovered databases and filter-out reasons #28336's inline logging with helper calls — same output, central format), Postgres, BigQuery, Redshift, MSSQL. MySQL inherits the no-op single-DB default and needs no override.sagemaker/metadata.pyandmlflow/metadata.pyswitched their existingfilter_by_mlmodelcalls to use the helper so they participate in the report._get_filtered_database_namesis reached from both main ingestion and themark_databases_as_deletedmaintenance pass. Discovery logging is therefore placed in each connector'sget_database_names, not in*_raw, to avoid double-counting; a docstring on the base helper flags this for future contributors.Example output (Snowflake, Cvent-like config):
Tests:
Use cases covered
databaseFilterPattern.excludes, and can immediately grepFILTER VISIBILITY REPORTto confirm which databases got removed and whydashboardFilterPattern,projectFilterPattern), pipeline (pipelineFilterPattern), messaging (topicFilterPattern), storage (containerFilterPattern), mlmodel (mlModelFilterPattern)status.filter(name, "Database Filtered Out")callers predating this helper still get bucketed into the right entity-type section of the report (back-compat)Unit tests
ingestion/tests/unit/utils/test_filter_visibility.py— 12 tests covering count accumulation, generator inputs, optional kwargs, rich reason storage, consolidated report format, empty-state no-op, unrelated-reason skip, and legacy-reason-string back-compattest_snowflake,test_postgres,test_bigquery,test_redshift,test_mssql,test_mysql,test_common_db_source,test_tableau,test_looker,test_airflow,test_s3_storage,test_sagemaker— 1 skipped, 0 failed)Backend integration tests
Ingestion integration tests
Status.filteris still called with the same"<EntityType> Filtered Out"prefix; persistedStepSummary.filteredcount is unchanged)Playwright (UI) tests
Manual testing performed
source env/bin/activate && cd ingestion && pip install -e . --no-depspython -m pytest ingestion/tests/unit/utils/test_filter_visibility.py -v→ 12/12 passpython -m pytest <12 connector test files>→ 189/190 pass (1 skipped, 0 failed)cd ingestion && make py_format_check→ cleanUI screen recording / screenshots:
Not applicable.
Checklist:
🤖 Generated with Claude Code
Summary by Gitar
get_filtered_countinstatus.pyto correctly aggregate helper-driven and legacyfilter()counts without double-counting.project_nameto string indashboard_service.pyfor consistent formatting in visibility reports.test_filter_visibility.pyto verify accuracy under mixed helper and legacy caller scenarios.This will update automatically on new commits.